from ma.commons.core.newjobaction import NewJobAction
from ma.commons.core.newmaptaskaction import NewMapTaskAction
from ma.commons.core.newreducetaskaction import NewReduceTaskAction
from ma.commons.core.constants import * 
from ma.commons.core.nodetrackerstatus import *

import logging
import logging.config
import ma.log
import ma.const

from ma.fs.dfs.dfsflags import *
from ma.fs.dfs._hdfs import HDFS
from ma.mrplus_master.core.taskscheduler import TaskScheduler

from ma.commons.core.completejobsaction import CompleteJobsAction
from threading import Thread

from .nodetracker import *
import time
import pickle
import sys
import os
import re


class Master(object):
	"""this class is the master node that will be 
	1. tracking MR jobs and 
	2. tracking MR tasks on the various NTs, 
	3. also monitor NTs' health 
	4. report progress on MR jobs to the user  
	5 will interact with the HDFS on the NT's behalf
	"""
	
	__max_reduce_level = 999999
	
	
	def __init__(self, forced_job_id=None):
		"""initiates the Master and all its data structures
		"""
			
		# initializing the logger
		self.__log = ma.log.get_logger('ma.master')
		
		self.newNTId = 0
		
		# layer that speaks to the HDFS
		host = ma.const.XmlData.get_str_data(ma.const.xml_hdfs_host)
		port = ma.const.XmlData.get_int_data(ma.const.xml_hdfs_port)
		self.hdfs  = HDFS(host, port)
		
		self.dfsflags = DfsFlags(self.hdfs)
		if forced_job_id != None:
			self.__log.warning('Forcing job id: %d', forced_job_id)
			self.dfsflags.force_job_id(forced_job_id)
		
		
		# is a double dict outer dict divides tasks on basis of job_id and inner dict divides tasks on the basis of struct
		#  {job_id:{struct_id:[[picked-up/done,(map_id, [inputs (filename,offset, size)])],.....]
		self.maps_of_jobs = {} 
		
		# is a double dict outer dict divides tasks on basis of job_id and inner dict divides tasks on the basis of struct
		#  {job_id: {struct_id: [[done/picked-up(assigned to a reduce), (reduce_id, reduce_level,(all map and reduce task tuples for this job that are inputs for the RTIP to shuffle),....)]]}
		self.reduces_of_jobs = {}
		
		list_of_jobs = self.dfsflags.get_active_job_list()
		
		# no apparent use for it!
		self.threads = []
		
		# jobs dict maintained for the scheduler
		self.jobs = {}
		
		# dict of jobs that have been completed since an NTs last heartbeat
		#  maintained per NT, is returned on a heartbeat call
		#  NT_id -> [] of recent completed tasks 
		self.complete_jobs = {}
		
		# double dict for which are structs complete in a job, maintained job wise
		#  job_id: structid -> Done(True/False)
		self.completed_structs_job = {}
		
		# holds info on active jobs (got from DFS) and whether its map phase has completed or not
		#  loop initializes structs used in the master for all active jobs  
		for job in list_of_jobs:
			# self.jobs[job] = [False]
			self.localizeJob(job)
			self.addJobToDict(job)
			self.maps_of_jobs[job] = {}
			self.reduces_of_jobs[job] = {}
		
		# dictionary storing the order in which all structures appear in the 
		#  map flags file. It is used to choose maps in order
		self.ordered_structs = {}
			
		# creating map stubs
		self.createMapStubs(list_of_jobs)
				
		# TODO: Fix interaction with task sched.
		#  this is the task scheduler that suggests what job to be picked up next
		self.task_scheduler = TaskScheduler(self.jobs)
				
		# this dict is task_id(jobid_taskid_M/R) -> TIP(points to no struct actually - a string)
		self.running_tasks = {}
		
		# this dict is task_id(jobid_taskid_M/R) -> TIP(points to no struct actually - a string)
		self.failed_tasks = {}
		
		# this dict is task_id(jobid_taskid_M/R) -> TIP
		self.completed_tasks = {}
		
		self.complete_jobs = {}
				
		# tasks running on which NTs. task_id -> NT_id
		self.task_to_NT = {}
		
		# NT_id -> NodeTrackerStatus
		self.live_NTs = {}
		
		self.check = 0
		
		# TODO: REMOVE temp output writing to file
		#  temp file creation for keeping track of the final reduces
		fd = open('reduce_final.out', 'w+')
		fd.close()
		
		
	def addJobToDict(self, job_id):
		"""adds a job to the active jobs dict
		"""
		
		# gets maps to red ratio from XML -job specific
		maps_to_red_ratio = ma.const.JobsXmlData.get_int_data(ma.const.xml_map_to_reduce_schedule_ratio, job_id)
		# gets factor to schedule ??
		schedule_multiply_factor = ma.const.XmlData.get_int_data(ma.const.xml_schedule_multiply_factor)
		
		self.__log.info("Job_id %s | Map to reduce ratio: %d | Schedule multiply ratio: %d", str(job_id), maps_to_red_ratio, schedule_multiply_factor)
		
		# this disct is maintained to be passed to the TaskScheduler
		#  job_id -> [ratio of scheduling maps - reduces, count of maps scheduled , dict struct-> complete or not (TRUE/FALSE)    
		self.jobs[job_id] = [maps_to_red_ratio * schedule_multiply_factor, 0, {}]
		self.completed_structs_job[job_id] = {}   
	
	
	def createMapStubs(self, list_of_jobs):
		"""This function simply calls a DFS function to create map stubs for a job and 
		initializes	dicts like maps_of_jobs (containing maps job wise) and initializes 
		jobs dict parmeter	that says whether maps for a struct are complete or not
		"""
		
		for job_id in list_of_jobs:
			# get all maps from the map flags
			maps, maps_complete = self.dfsflags.choose_map_tasks(self.dfsflags.CHOOSE_ALL_MAPS, job_id)
			
			# Returns a 'maps' list of tuples (map_id, struct_id, [inputs (filename,offset, size)])
			#  this is the inner dict that is structured by struct_ids within a job 
			self.maps_of_jobs[job_id] = {}
			self.__log.info("%d MAPS reported by DFS for job id %s", len(maps), str(job_id))
			
			# create the ordered structure list
			self.create_ord_struct_list(job_id, maps)
			self.__log.debug("Jobs struct ordering for job id %s : %s", str(job_id), str(self.ordered_structs[job_id]))
			
			# this for assigns the values associated with this newly created map stubs in the self.maps_of_jobs dict whose struct is:
			#  {job_id:{struct_id:[[picked-up/done,(map_id, [inputs (filename,offset, size)])],.....]
			for i in range (0,len(maps)):
				if maps[i][1] not in self.maps_of_jobs[job_id]:
					self.maps_of_jobs[job_id][maps[i][1]] = []
					
					# initializing the structures in a job
					self.completed_structs_job[job_id][maps[i][1]] = False
					
					# initializes jobs dict parameter 
					#  that says whether maps for a struct are complete or not
					self.jobs[job_id][2][maps[i][1]] = False
					
				self.maps_of_jobs[job_id][maps[i][1]].append([0,(maps[i][0],maps[i][1],maps[i][2]),""])
				
	
	def create_ord_struct_list(self, job_id, maps_list):
		"""This function creates the ordered struct list for a given job_id
		and given its list of maps (directly coming from map flags). It can be
		used while choosing the maps in order. It populates the dictionary
		ordered_structs
		"""
		
		# empty the spot in the dictionary for the current job_id
		self.ordered_structs[job_id] = []
		
		# iterate through the maps and get the unique list of structs
		for map in maps_list:
			struct_id = map[1]
			
			# if the given struct id is not already present in the ordered list
			if struct_id not in self.ordered_structs[job_id]:
				self.ordered_structs[job_id].append(struct_id)
	
	
	def createReduceStub(self, job_id, reduce_id, reduce_level, struct_id, input_data):
		"""this function adds a reduce stub to the following reduces dict
		{job_id: {struct_id: [[done/picked-up(assigned to a reduce), (reduce_id, 
		reduce_level,(all map and reduce task tuples for this job that are 
		inputs for the RTIP to shuffle),....)]]}
		input_data is the tuple of map and reduce_ids that make up this reduce TIP's input
		"""
		
		# loop marks input tasks as being complete and picked up for a reduce stub
		#  item is (jobid, reduce_id, level, struct_id, input_data)
		for item in input_data:
			# if task completed is a MAP
			if item[1] == MAP:
				jobid = item[0]
				# marking a map task as being picked up by a reduce
				for i in self.maps_of_jobs[jobid][struct_id]:
					if(item[2] == i[1][0]):
						i[0] = 3
						break
			# if task completed is a REDUCE			
			else:
				jobid = item[0]
				# marking a map task as being picked up by a reduce
				for i in self.reduces_of_jobs[jobid][struct_id]:
					if(item[2] == i[1][0]):
						i[0] = 3
						break
		
		# this if assigns the values associated with this newly created reduce stub					   
		if job_id in self.reduces_of_jobs:
			if struct_id not in self.reduces_of_jobs[job_id]:
				self.reduces_of_jobs[job_id][struct_id] = []
				
			self.reduces_of_jobs[job_id][struct_id].append([0, (reduce_id, reduce_level, input_data, struct_id), ""])
			self.__log.debug("REDUCE STUB %s job_id %s created for LEVEL %d and STRUCT ID %s with %d inputs", str(reduce_id), str(job_id), reduce_level, str(struct_id), len(input_data))
		else:
			raise

		
	def run(self):
		"""this func runs forever it starts before any other thread begins
		"""
		
		pass

		
	def retrieveTaskJobId(self, task_id):
		"""this func returns the job_id, task_id and attempts at a task
		from the task_id string in which they are embedded as a hyphenated string
		"""
		
		list = task_id.split(TASK_ID_SEP)
		
		ids = []
		
		for i in range (0,len(list)):
			ids.append(int(i))
		
		# returns as int-s job_id, Task_id and map_or_reduce
		return int(list[0]), int(list[1]), list[2]
	
		
	def constructTaskId(self, job_id, task_id, m_or_r):
		"""this func returns a single string which can be deciphered to the
		job_id, task_id and the task type using the retrieveTaskJobId()
		function
		"""
		
		# returns the task_id string
		return str(job_id) + TASK_ID_SEP + str(task_id) + TASK_ID_SEP + str(m_or_r)
	
	
	def getActiveJobs(self):	
		"""will get a list of active jobs from the hdfs and update its jobs dict
		"""
		
		self.__log.debug("before refreshing jobs in queue: %s", str(self.jobs))
		
		new_jobs = []
		list_of_jobs = self.dfsflags.get_active_job_list()
		
		# filling in active jobs list from the HDFS
		for i in list_of_jobs:
			if i not in self.jobs:
				self.addJobToDict(i)
				new_jobs.append(i)
				self.maps_of_jobs[i] = {}
				self.reduces_of_jobs[i] = {}
				self.localizeJob(i)
		
		self.createMapStubs(new_jobs)
		self.task_scheduler.refreshJobsInfo(list_of_jobs)
				
		inactive_jobs = []
		
		# removing inactive jobs from the master
		for j in self.jobs:
			if j not in list_of_jobs:
				inactive_jobs.append(j)
				del self.maps_of_jobs[j]
				del self.reduces_of_jobs[j]
				# self.complete_jobs.append(j)
	
		for i in inactive_jobs:
			del self.jobs[i]
			
		self.__log.info("after refreshing jobs in queue: %s", str(self.jobs))

		
	def mark_map_complete(self, task_id, struct_id=STRUCT_ID_DEFAULT, nodetracker_ip="0.0.0.0"):
		"""this function will mark a map task complete and also check if other map tasks are lying 
		completed but not picked by a reduce then it will make a reduce stub out of these maps!
		maps_of_jobs has struct -> {job_id:{struct_id:[[picked-up/done,(map_id, [inputs (filename,offset, size)])],.....]
		"""
		
		jobid, taskid, map_or_reduce = self.retrieveTaskJobId(task_id)
		
		max_map_to_red_inputs = ma.const.JobsXmlData.get_int_data(ma.const.xml_map_reduce_ratio, jobid)
		
		# remove task from running list into completed list
		if task_id in self.running_tasks:
			self.completed_tasks[task_id] = self.running_tasks.pop(task_id)
		else:
			self.__log.warning("Map Task %s struct-id %s has already been marked completed", str(task_id), str(struct_id))
			return False
		
		# mark task as done
		for item in self.maps_of_jobs[jobid][struct_id]:
			if item[1][0] == taskid:
				#{job_id:{struct_id:[[picked-up/done,(map_id, [inputs (filename,offset, size)])],.....]
				item[0] = 2
				item[2] = nodetracker_ip
				break
		
		# get input map tasks to a  reduce task ratio
		
		# initializing data local to a function
		count_maps = 0
		
		reduce_tasks_list = []
		input_data = []
		maps_undone = False
		reduce_id = 0
		new_reduces_created = 0
		
		# Reduce _id built progressively
		for struct in self.reduces_of_jobs[jobid]:
			reduce_id += len(self.reduces_of_jobs[jobid][struct])
			
		# make a reduce stub if other maps are done and not assigned to other reduces
		for map_data in self.maps_of_jobs[jobid][struct_id]:
			
			# if another map done but not picked by reduce
			if(map_data[0] == 2):
				input_data.append((jobid, MAP, map_data[1][0], map_data[2]))
				count_maps +=1
			# mark if any maps of this job and struct are not done!
			elif map_data[0] < 2:
				maps_undone = True
			
			# if enough tasks collected in input_data list that can make a stub mark them as a reduce task
			if len(input_data) == max_map_to_red_inputs:
				reduce_tasks_list.append((jobid, reduce_id, 1, struct_id, input_data))
				reduce_id += 1
				count_maps = 0
				new_reduces_created += 1
				
				input_data = []
		
		# this is the conditions that makes a reduce stub of left over task in case
		#  reduces left are less than map_to_red_input ratio and essentially all maps have
		#  been completed		
		if maps_undone == False and len(input_data) in range(1, max_map_to_red_inputs):
			reduce_tasks_list.append((jobid, reduce_id, 1, struct_id, input_data))
		
		for i in range(0, len(reduce_tasks_list)):
			# mark map as done and picked up by an R-TIP
			self.createReduceStub(reduce_tasks_list[i][0], reduce_tasks_list[i][1], reduce_tasks_list[i][2], reduce_tasks_list[i][3], reduce_tasks_list[i][4])
			# print "Reduce Tasks initiated from maps" , reduce_tasks_list[i][4] , "REDUCE ID",reduce_tasks_list[i][1]
				
		self.__log.info("Mark MAP complete called for %s job-id %s, struct-id %s | %d reduce stubs created", str(task_id), str(jobid), str(struct_id), new_reduces_created)
		
		self.checkAndMarkMapsComplete(jobid, struct_id)
		
		
	def mark_reduce_complete(self, task_id, struct_id=STRUCT_ID_DEFAULT, nodetracker_ip="0.0.0.0"):
		"""this function will mark a reduce task complete and also check if other map tasks are lying 
		completed but not picked by a reduce then it will make a reduce stub out of these maps!
		maps_of_jobs has struct -> {job_id:{struct_id:[[picked-up/done,(map_id, [inputs (filename,offset, size)])],.....]
		"""
		
		jobid, taskid, map_or_reduce = self.retrieveTaskJobId(task_id)
		self.__log.info("Mark REDUCE complete called for %s job-id %s, struct-id %s", str(task_id), str(jobid), str(struct_id))
		
		# get red_to_red ratio from XML
		max_red_to_red_inputs = ma.const.JobsXmlData.get_int_data(ma.const.xml_reduce_input_ratio, jobid)
		
		# take complete task form running tasks list and put in completed tasks
		if task_id in self.running_tasks:
			self.completed_tasks[task_id] = self.running_tasks.pop(task_id)
		else:
			self.__log.warning("Reduce Task %s struct-id %s has already been marked completed", str(task_id), str(struct_id))
			return False
		
		# mark task as done
		for item in self.reduces_of_jobs[jobid][struct_id]:
			#item is [[done/picked-up(assigned to a reduce), (reduce_id, reduce_level,(all map and reduce task tuples for this job that are inputs for the RTIP to shuffle),....)]]
			if item[1][0] == taskid:
				item[0] = 2
				item[2] = nodetracker_ip
				break
		
		# initializing local function variables
		count_reduces = 0 
		
		reduce_tasks_list = []
		input_data = []
		reduces_undone = False
		reduce_id = 0
		
		# Reduce _id built progressively
		for struct in self.reduces_of_jobs[jobid]:
			reduce_id += len(self.reduces_of_jobs[jobid][struct])
							
		reduce_level = 1
		reduces_complete = True
		# all reduces for this struct complete and stubbed
		computation_complete_for_struct = True
		
		# check if maps have been completed for the struct in question 
		if self.jobs[jobid][2][struct_id] is True:
			#self.__log.info('REMOVE THIS LOG STMT - Computation complete for job-id %s struct %s', str(jobid), str(struct_id))
			maps_complete = True
		else:
			maps_complete = False
		
		# make a reduce stub if other reduces are done and not assigned to other reduces
		for reduce_data in self.reduces_of_jobs[jobid][struct_id]:
			#  if another reduce done but not picked by reduce
			if(reduce_data[0] == 2):
				input_data.append((jobid, REDUCE, reduce_data[1][0], reduce_data[2]))
				count_reduces +=1
				
				if reduce_level <= reduce_data[1][1]:
					reduce_level = reduce_data[1][1] + 1
				
				if reduce_data[1][0] != taskid:
					computation_complete_for_struct = False
			
			# if any reduce still undone or unpicked by an NT then mark following variables False				
			elif reduce_data[0] == 0 or reduce_data[0] == 1:
				#self.__log.info('REMOVE THIS LOG STMT - Reduce %s not completed till yet', str(reduce_data[1][0]))
				reduces_complete = False
				computation_complete_for_struct = False
			
			# if enough tasks collected in input_data list that can make a stub mark them as a reduce task	
			if count_reduces >= max_red_to_red_inputs:
				reduce_tasks_list.append((jobid, reduce_id, reduce_level, struct_id, input_data))
				reduce_id += 1
				count_reduces = 0
				reduce_level = 1
				input_data = []
		
		# check if all reduces are complete and last reduce needs to be made after all maps complete
		if reduces_complete and maps_complete and count_reduces in range(2,max_red_to_red_inputs):
			reduce_tasks_list.append((jobid, reduce_id, reduce_level, struct_id, input_data))
			self.__log.info('///// Stubbing last REDUCE task id %s for job-id %s struct-id %s at level %d with %d inputs', str(reduce_id), str(jobid), str(struct_id), reduce_level, count_reduces)
			
		for i in range(0, len(reduce_tasks_list)):
			self.createReduceStub(reduce_tasks_list[i][0], reduce_tasks_list[i][1], reduce_tasks_list[i][2], reduce_tasks_list[i][3], reduce_tasks_list[i][4])
			
			# check if the struct needs to be marked complete prematurely
			est_ignr_str = ma.const.JobsXmlData.get_str_data(ma.const.xml_estimation_ignore, jobid)
			matched_params = re.findall(r'^(\d+)|,(\d+)', est_ignr_str)
			level_to_ignore = matched_params[0][0]
			
			# if the xml says that structs need to be completed prematurely
			if level_to_ignore != "0" and reduce_tasks_list[i][2] >= int(level_to_ignore):
				struct_not_to_ignore = list(map(operator.itemgetter(1), matched_params[1:]))
				# if all structs need to be ignored or if the struct is not in the not to ignore list
				if struct_not_to_ignore == [] or reduce_tasks_list[i][3] not in struct_not_to_ignore:
					self.__log.warning("Skipping structure %s reduces to mimic estimation away from the needed result", reduce_tasks_list[i][3])
					computation_complete_for_struct = True
		
		# check if all reduces complete and stubbed / and all maps are done!
		if computation_complete_for_struct and maps_complete:
			#print '///// Struct ' + str(struct_id) + ' last reduce id ' + str(last_id) + ', level ' + str(last_level) + ' /////'
			self.markStructComplete(jobid, struct_id)
			last_level = -1
			last_id = -1
			
			# get the id and level of the reduce which has the final result
			#  of this particular structure
			for reduce in self.reduces_of_jobs[jobid][struct_id]:
				reduce_id = reduce[1][0]
				reduce_level = reduce[1][1]
				if last_level < reduce_level:
					last_level = reduce_level
					last_id = reduce_id
			
			self.__log.info("///////////////// Struct %s job-id %s Complete! /////// Last Reduce: %s, Level %s /////////////////", str(struct_id), str(jobid), str(last_id), str(last_level))
			
			# TODO: REMOVE temp output writing to file
			#  temp writing to final reduce file
			fd = open('reduce_final.out', 'a')
			fd.write('Struct ' + str(struct_id) + ', Reduce id ' + str(last_id) + ', Level ' + str(last_level) + ', ' + str(time.time()) + '\n')
			fd.close()
			
	
	"""	
	def mark_task_complete(self, task_id):
	
		jobid, taskid, map_or_reduce = self.retrieveTaskJobId(task_id)
		
		if task_id in self.running_tasks:
			self.completed_tasks[task_id] = self.running_tasks.pop(task_id)
		else:
			return False
		
		if map_or_reduce == MAP:
			self.maps_of_jobs[jobid][taskid][0] = 2
			for tracker_id in self.completedMapstoNTDict:
				self.completedMapstoNTDict[tracker_id].append((taskid, jobid))
				
				
			if jobid in self.maps_of_jobs:
				maps_not_complete = False
				for index in range(0,len(self.maps_of_jobs[jobid])):
					if(self.maps_of_jobs[jobid][index][0] < 2):
						maps_not_complete = True
						break
				if maps_not_complete == False:
					self.markMapsComplete(jobid)
			return True	
		else:
			reduces_not_complete = False
			self.reduces_of_jobs[jobid][taskid][0] = 2
			for index in range(0,len(self.reduces_of_jobs[jobid])):
				if(self.reduces_of_jobs[jobid][index][0] < 2):
					reduces_not_complete = True
					break
			if reduces_not_complete == False:
				self.markJobComplete(jobid)
			
			return True			
		return False
	"""
		
	def mark_task_failed(self, task_id):
		"""mark task failed; remove task from running list to failed tasks list
		"""
	
		if task_id in self.running_tasks:
			self.failed_tasks[task_id] = self.running_tasks.pop(task_id)
			return True
		return False
	
	
	def choose_map_tasks(self, tasks_to_run, job_id, tracker_id):
		"""returns list of (map_id, [inputs (filename,offset, size)]) of size tasks_to_run
		"""
		
		list_of_maps = []
		
		# sort the jobs by job_ids
		job_ids = list(self.maps_of_jobs.keys())
		job_ids.sort()
		
		# check if the current job_id is valid
		if job_id in job_ids:
			
			# if strict ordering is demanded by this particular job
			if ma.const.JobsXmlData.get_int_data(ma.const.xml_order_by_structs, job_id) == 1:
				struct_list = self.ordered_structs[job_id]
			else:
				struct_list = self.maps_of_jobs[job_id]
				
			# choose maps according to the ordering of structs
			for struct_id in struct_list:
				
				# if there are no more tasks to run 
				if tasks_to_run <= 0:
					break
				
				# for number of maps in a particular struct for a particular job
				#  choose undone tasks from structes as many as wanted in the params	
				for index in range(0,len(self.maps_of_jobs[job_id][struct_id])):
					
					# if there are no more tasks to run 
					if tasks_to_run <= 0:
						break
					
					# if the map task has not already been picked up
					if self.maps_of_jobs[job_id][struct_id][index][0] == 0:
						# subscripting this list in the struct_id ->[picked-up/done,(map_id, [inputs (filename,offset, size)])],.....]
						list_of_maps.append(self.maps_of_jobs[job_id][struct_id][index][1])
						
						# mark task as picked up
						self.maps_of_jobs[job_id][struct_id][index][0] = 1
						tasks_to_run -= 1
						task_id = self.maps_of_jobs[job_id][struct_id][index][1][0]
						
						nodetracker_ip = self.live_NTs[tracker_id].nodetracker_ip
						self.__log.info("Assigned Map task %s with job_id %s, struct-id %s to NT %s (%s)", str(task_id), str(job_id), str(struct_id), str(tracker_id), nodetracker_ip)
						
						task_id = self.constructTaskId(job_id, task_id, MAP)
						self.running_tasks[task_id] = "momina"
						
		# returns list of maps as many as told in the params			
		return list_of_maps						
		
		
	def choose_reduce_tasks(self, tasks_to_run, job_id, tracker_id):
		"""Returns list of tuples (reduce_id, level, [inputs ([MR], task_id)])
		as many as dictated in the params
		"""
		
		list_of_reduces = []
		
		# checks if a reduce level bias is given if so tasks from that level are chosen
		level_bias = ma.const.JobsXmlData.get_int_data(ma.const.xml_schedule_bias_for_large_levels, job_id)
		
		if job_id in self.reduces_of_jobs:
			
			# if struct ordering is demanded by this particular job
			if ma.const.JobsXmlData.get_int_data(ma.const.xml_order_by_structs, job_id) == 1:
				struct_list = self.ordered_structs[job_id]
			else:
				struct_list = self.reduces_of_jobs[job_id]
			
			for struct_id in struct_list:
				if tasks_to_run <= 0:
					break
				
				# if the struct id is in the reduces made
				if struct_id in self.reduces_of_jobs[job_id]:
					# get the list of structs
					struct_reduce_list = self.reduces_of_jobs[job_id][struct_id]
					
					# sort it so that we choose high level reduces first, if option set
					if level_bias == 1:
						# sort by decreasing number of levels
						struct_reduce_list.sort(key=operator.itemgetter(1), cmp=lambda x,y: cmp(y[1], x[1]))
					
					for index in range(0, len(struct_reduce_list)):
						if tasks_to_run <= 0:
							break
						
						# looking in list -> 
						#  [[done/picked-up(assigned to a reduce),(reduce_id,reduce_level(all map and reduce task tuples for this job that are inputs for the RTIP to shuffle),....)]
						# if task not picked up
						if struct_reduce_list[index][0] == 0:	
							list_of_reduces.append(struct_reduce_list[index][1])
		
							struct_reduce_list[index][0] = 1
							tasks_to_run -= 1
							task_id = struct_reduce_list[index][1][0]
							level_no = struct_reduce_list[index][1][1]
							no_inputs = len(struct_reduce_list[index][1][2])
							
							nodetracker_ip = self.live_NTs[tracker_id].nodetracker_ip
							self.__log.info("Assigned Reduce task %s with job_id %s, struct-id %s level %s no_inputs %d to NT %s (%s)", str(task_id), str(job_id), str(struct_id), str(level_no), no_inputs, str(tracker_id), nodetracker_ip)
							
							task_id = self.constructTaskId(job_id, task_id, REDUCE)
							self.running_tasks[task_id] = "ahmad"
		
		# list of reduces returned to NT as dictated in the params	
		return list_of_reduces
		
	
	def assignNewTasksToNT(self, tasks_to_run, actions, tracker_id):
		"""this func will assign new tasks to the NT
		"""
		
		'''the following bit of code will get 
		1 a) preference list of tasks from the TaskScheduler
		2) try and book these tasks for this particular NT through DFSFlags
		3) make ReduceTip and MapTip of these tasks 
		   objects to be passed to the NT to run as action objects!
		'''
		
		# if no tasks desired by NT
		if(tasks_to_run == 0):
			return
		
		# get job_id and a list of suggested map or reduce tasks from the TasksScheduler
		#  for that job
		job_id, map_or_red = self.task_scheduler.scheduleTasks(self.jobs, tasks_to_run, self.jobs, self.completed_structs_job, self.ordered_structs)
		
		nodetracker_ip = self.live_NTs[tracker_id].nodetracker_ip
		self.__log.info("Scheduling %s tasks of job-id %s for NT %s (%s)", map_or_red, str(job_id), str(tracker_id), nodetracker_ip)
		
		if job_id == -1:
			# means no job in job list
			return
		
		# case scheduler suggests map tasks 
		if map_or_red == MAP:
			# this function will return a maps tasks list as many as wanted by the params
			list_of_tasks_assigned = self.choose_map_tasks(tasks_to_run, job_id, tracker_id)
			
			self.__log.info("%d Maps scheduled for NT %s (%s)", len(list_of_tasks_assigned), str(tracker_id), nodetracker_ip)
			if list_of_tasks_assigned is None:
				return
			
			# task info tuple conatins (map_id, struct_id, [inputs (filename,offset, size)])
			for task_info in list_of_tasks_assigned:
				# make new MAP TASK action object
				map_task = NewMapTaskAction(task_info[0], job_id, task_info[2], task_info[1])
				
				# ADD TO ACTIONS LIST
				actions.append(map_task)
			
				#self.__log.info("map task_id %s assigned by the master of job_id %d", str(task_info[0]), job_id)
				task_id = self.constructTaskId(job_id, task_info[0], MAP)
				self.running_tasks[task_id] = map_task

		else: # in case scheduler suggests reduce tasks
			# this function will return a reduces tasks list as many as wanted by the params
			list_of_tasks_assigned = self.choose_reduce_tasks(tasks_to_run, job_id, tracker_id)

			self.__log.info("%d Reduces scheduled for NT %s (%s)", len(list_of_tasks_assigned), str(tracker_id), nodetracker_ip)
			if list_of_tasks_assigned is None:
				return

			for task_info in list_of_tasks_assigned:
				# making new reduce action objects			
				reduce_task = NewReduceTaskAction(job_id,task_info[0], task_info[2], task_info[1], task_info[3])
				
				# ADD TO ACTIONS LIST
				actions.append(reduce_task)

				#self.__log.info("reduce task_id %s assigned by the master of job_id %d", str(task_info[0]), job_id)
				task_id = self.constructTaskId(job_id, task_info[0], REDUCE)
				self.running_tasks[task_id] = reduce_task

	
	def heartbeatFromNT(self, taskinfo_string, tasks_to_run, tracker_id):
		"""this func will be periodically called from every NT to pass task info 
		objects to the master so that it can update its status info of tasks
		running on this NT onto its task dicts
		this function will return an actions list of objects dictating what actions 
		the NT is to perform till the next heartbeat  
		"""	
		
		# RPC unmarshalling arguments
		taskinfo_list = pickle.loads(taskinfo_string)
		
		# list of actions to be returned
		actions = []
		
		# get the ip of the nodetracker whose heartbeat is coming
		nodetracker_ip = self.live_NTs[tracker_id].nodetracker_ip
		
		# placing complete and failed tasks info from the NT onto the hDFS
		for key in range(0, taskinfo_list.__len__()):
			i = taskinfo_list.__getitem__(key)
			task_id_tag = self.constructTaskId(i.job_id, i.task_id, i.map_or_reduce)
			
			# any com[pleted tasks by the NT are markes by the master as complete	
			if i.type == COMPLETEDTASK_INFO_TYPE:	
				if i.map_or_reduce == MAP:
					# mark task as complete on the master	
					self.mark_map_complete(task_id_tag, i.struct_id, str(nodetracker_ip))
					self.__log.debug("Map task with job_id %s and task_id %s, struct_id %s reported complete by NT %s (%s)", str(i.job_id), str(i.task_id), str(i.struct_id), str(tracker_id), nodetracker_ip)
				else:
					self.mark_reduce_complete(task_id_tag, i.struct_id, str(nodetracker_ip))
					self.__log.debug("Reduce task with job_id %s and task_id %s, struct_id %s reported complete by NT %s (%s)", str(i.job_id), str(i.task_id), str(i.struct_id), str(tracker_id), nodetracker_ip)
			
			# any failed tasks by the NT are markes by the master as failed
			if i.type == FAILEDTASK_INFO_TYPE:
				# mark task as failed on the master
				self.mark_task_failed(task_id_tag)
				self.__log.error("A %s task with job_id %s and task_id %s, struct_id %s reported failed by NT %s (%s)", str(i.map_or_reduce), str(i.job_id), str(i.task_id), str(i.struct_id), str(tracker_id), nodetracker_ip)
		
		# get recent completed jobs for NT
		recent_jobs_completed = self.complete_jobs[tracker_id]
		self.complete_jobs[tracker_id] = []
		
		# assigning tasks to the NT according to the number of fresh tasks it can run		
		self.assignNewTasksToNT(tasks_to_run, actions, tracker_id)
				
		# marshalling the return vaules to be sent to the NT 
		actions_str = pickle.dumps(actions)
		return actions_str, recent_jobs_completed 
	
	
	def registerNT(self, nodetracker_ip):
		"""this method will register an NT with the master as soon as it comes up
		"""
		
		# get the next tracker-id and increment id
		tracker_id = self.newNTId
		self.newNTId +=1 
		
		tracker_status = NodeTrackerStatus(tracker_id, nodetracker_ip)		
		self.live_NTs[tracker_id] = tracker_status
		
		self.complete_jobs[tracker_id] = []
		
		self.__log.info("New tracker %s (%s) reporting for duty", str(tracker_id), nodetracker_ip)
		
		return tracker_id
	
	
	def removeNT(self, tracker_id):
		""" this function will remove an NT from the live NTs dict if the former 
		has expired
		"""
		
		pass
	
	
	def checkAndMarkMapsComplete(self, job_id, struct_id=STRUCT_ID_DEFAULT):
		"""marks maps complete at the master
		"""
		
		for map_data in self.maps_of_jobs[job_id][struct_id]:
			if map_data[0] < 2:
				return  
		
		self.__log.info("///////// Maps for struct %s complete /////////", str(struct_id))
		self.jobs[job_id][2][struct_id] = True
	
	
	def markStructComplete(self, job_id, struct_id = STRUCT_ID_DEFAULT):
		"""marks jobs complete at the master
		"""
		
		for nt_id in self.complete_jobs:
			self.complete_jobs[nt_id].append((job_id, struct_id))
		
		if job_id in self.completed_structs_job and struct_id in self.completed_structs_job[job_id]:
			self.completed_structs_job[job_id][struct_id] = True
			self.__log.info("///////////////////// Marking the Struct %s of Job %s as complete /////////////////////", str(struct_id), str(job_id))
			
		
	def localizeJob(self, job_id):
		"""Run for copying all essential files from the DFS to the local
		directory whenever this NT gets a job whose task it hasn't done 
		before. It also creates the relevant directories in the local FS
		"""
		try:
			self.__log.info("Initiating / copying config files for Job id %d", job_id)
			
			# get consts
			input_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, job_id)
			output_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, job_id)
			compute_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_compute_temp_dir, job_id)
			
			# create directories
			if not os.path.isdir(input_path):
				os.makedirs(input_path)
			if not os.path.isdir(output_path):
				os.makedirs(output_path)
			if not os.path.isdir(compute_path):
				os.makedirs(compute_path)
			
			# copy job conf
			job_conf_local_dir = os.path.dirname(ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, job_id))
			job_conf_dfs_filepath = ma.const.JobsXmlData.get_dfs_filepath_str_data(ma.const.xml_dfs_path_job_conf, job_id)
			
			if self.hdfs.check_path(job_conf_dfs_filepath) == 1:
				# delete if already exists
				local_jobconf_filepath = job_conf_local_dir + os.sep + os.path.basename(job_conf_dfs_filepath)
				if os.path.isfile(local_jobconf_filepath):
					os.remove(local_jobconf_filepath)
				
				self.hdfs.copy_file_to_local(job_conf_dfs_filepath, job_conf_local_dir, job_id, auto_retry=True)
			else:
				self.__log.error('Job Conf for job_id %d does not exist', job_id)
			
			# reinitialize the jobs XML data to remove all previous values
			ma.const.JobsXmlData.reinitialize(job_id)
			
		except Exception as err_msg:
			self.__log.error('Error while localizing new job: %s', err_msg) 



def main(argv=None):
	
	from xmlrpc.server import SimpleXMLRPCServer
	
	master_ip = ma.const.XmlData.get_str_data(ma.const.xml_master_ip)
	master_port = ma.const.XmlData.get_int_data(ma.const.xml_master_port)
	# Create server
	print('master ip is: ', master_ip)
	print('master port: ', master_port)
	server = SimpleXMLRPCServer((master_ip, master_port))
	server.register_introspection_functions()
	
	if len(sys.argv) == 2:
		forced_job_id = int(sys.argv[1])
		print('--> Forced job id', forced_job_id)
		server.register_instance(Master(forced_job_id))
		print("Master has registered")
	else:
		server.register_instance(Master())
	
	

	# Run the server's main loop
	server.serve_forever()
	
	'''
	print 'hello'
	master = Master()
	print master.choose_map_tasks(2, 1)
	print master.mark_map_complete('1_0_M', struct_id = STRUCT_ID_DEFAULT)
	print master.mark_map_complete('1_2_M', struct_id = STRUCT_ID_DEFAULT)
	print master.choose_reduce_tasks(2, 1)
	master.mark_reduce_complete('1_0_R', 'A')
	master.mark_reduce_complete('1_1_R', 'A')
	print "hellp"
	
	#master.getActiveJobs()
	
	print master.choose_reduce_tasks(3, 1)
	print master.reduces_of_jobs
	id = master.registerNT()
	task_id = str(2)+'_'+str(1)+'_R'
	master.running_tasks[task_id] = NewMapTaskAction(2,1,2)
	master.mark_task_complete(task_id, id)
	print master.completed_tasks
	print master.reduces_of_jobs
		map_task = NewMapTaskAction(1, 0, MAP)
	master.running_tasks['0_1_M'] = map_task
	reduce_task = NewReduceTaskAction(1, 0,REDUCE)
	master.running_tasks['0_1_R'] = reduce_task
	
	master.heartbeatFromNT('momina', 2, 0, True)   
	'''
			
if __name__ == "__main__":
	"""The program's entry point."""
	sys.exit(main())

	